1   package org.apache.lucene.index;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with
6    * this work for additional information regarding copyright ownership.
7    * The ASF licenses this file to You under the Apache License, Version 2.0
8    * (the "License"); you may not use this file except in compliance with
9    * the License. You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  import java.util.IdentityHashMap;
20  import java.util.Map;
21  
22  import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
23  import org.apache.lucene.util.InfoStream;
24  import org.apache.lucene.util.ThreadInterruptedException;
25  
26  /**
27   * Controls the health status of a {@link DocumentsWriter} sessions. This class
28   * used to block incoming indexing threads if flushing significantly slower than
29   * indexing to ensure the {@link DocumentsWriter}s healthiness. If flushing is
30   * significantly slower than indexing the net memory used within an
31   * {@link IndexWriter} session can increase very quickly and easily exceed the
32   * JVM's available memory.
33   * <p>
34   * To prevent OOM Errors and ensure IndexWriter's stability this class blocks
35   * incoming threads from indexing once 2 x number of available
36   * {@link ThreadState}s in {@link DocumentsWriterPerThreadPool} is exceeded.
37   * Once flushing catches up and the number of flushing DWPT is equal or lower
38   * than the number of active {@link ThreadState}s threads are released and can
39   * continue indexing.
40   */
41  final class DocumentsWriterStallControl {
42    
43    private volatile boolean stalled;
44    private int numWaiting; // only with assert
45    private boolean wasStalled; // only with assert
46    private final Map<Thread, Boolean> waiting = new IdentityHashMap<>(); // only with assert
47    private final InfoStream infoStream;
48  
49    DocumentsWriterStallControl(LiveIndexWriterConfig iwc) {
50      infoStream = iwc.getInfoStream();
51    }
52    
53    /**
54     * Update the stalled flag status. This method will set the stalled flag to
55     * <code>true</code> iff the number of flushing
56     * {@link DocumentsWriterPerThread} is greater than the number of active
57     * {@link DocumentsWriterPerThread}. Otherwise it will reset the
58     * {@link DocumentsWriterStallControl} to healthy and release all threads
59     * waiting on {@link #waitIfStalled()}
60     */
61    synchronized void updateStalled(boolean stalled) {
62      this.stalled = stalled;
63      if (stalled) {
64        wasStalled = true;
65      }
66      notifyAll();
67    }
68    
69    /**
70     * Blocks if documents writing is currently in a stalled state. 
71     * 
72     */
73    void waitIfStalled() {
74      if (stalled) {
75        synchronized (this) {
76          if (stalled) { // react on the first wakeup call!
77            // don't loop here, higher level logic will re-stall!
78            try {
79              incWaiters();
80              // Defensive, in case we have a concurrency bug that fails to .notify/All our thread:
81              // just wait for up to 1 second here, and let caller re-stall if it's still needed:
82              wait(1000);
83              decrWaiters();
84            } catch (InterruptedException e) {
85              throw new ThreadInterruptedException(e);
86            }
87          }
88        }
89      }
90    }
91    
92    boolean anyStalledThreads() {
93      return stalled;
94    }
95    
96    long stallStartNS;
97  
98    private void incWaiters() {
99      stallStartNS = System.nanoTime();
100     if (infoStream.isEnabled("DW") && numWaiting == 0) {
101       infoStream.message("DW", "now stalling flushes");
102     }
103     numWaiting++;
104     assert waiting.put(Thread.currentThread(), Boolean.TRUE) == null;
105     assert numWaiting > 0;
106   }
107   
108   private void decrWaiters() {
109     numWaiting--;
110     assert waiting.remove(Thread.currentThread()) != null;
111     assert numWaiting >= 0;
112     if (infoStream.isEnabled("DW") && numWaiting == 0) {
113       long stallEndNS = System.nanoTime();
114       infoStream.message("DW", "done stalling flushes for " + ((stallEndNS - stallStartNS)/1000000.0) + " ms");
115     }
116   }
117   
118   synchronized boolean hasBlocked() { // for tests
119     return numWaiting > 0;
120   }
121   
122   boolean isHealthy() { // for tests
123     return !stalled; // volatile read!
124   }
125   
126   synchronized boolean isThreadQueued(Thread t) { // for tests
127     return waiting.containsKey(t);
128   }
129 
130   synchronized boolean wasStalled() { // for tests
131     return wasStalled;
132   }
133 }